/*
 * Copyright 2018 NAVER Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.navercorp.pinpoint.flink.receiver;


import com.navercorp.pinpoint.collector.handler.SimpleHandler;
import com.navercorp.pinpoint.flink.vo.RawData;
import com.navercorp.pinpoint.io.request.ServerRequest;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.thrift.TBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;


/**
 * @author minwoo.jung
 */
public class AgentStatHandler extends SourceContextManager implements SimpleHandler {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public void handleSimple(ServerRequest serverRequest) {
        if (!(serverRequest.getData() instanceof TBase<?, ?>)) {
            throw new UnsupportedOperationException("data is not support type : " + serverRequest.getData());
        }

        final TBase<?, ?> tBase = (TBase<?, ?>) serverRequest.getData();
        final Map<String, String> metaInfo = new HashMap<>(serverRequest.getHeaderEntity().getEntityAll());
        final RawData rawData = new RawData(tBase, metaInfo);
        final SourceContext sourceContext = roundRobinSourceContext();

        if (sourceContext == null) {
            logger.warn("sourceContext is null.");
            return;
        }

        sourceContext.collect(rawData);

    }
}
